{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LlrEjmtJNpuX"
      },
      "source": [
        "# Ingesting data with BigQuery\n",
        "\n",
        "This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GNaAN-GNO5qp"
      },
      "source": [
        "## Installing dependencies and importing packages"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "qgclZayCk1Ct",
        "outputId": "7da1e962-ead6-4016-b2e5-12a019885d86"
      },
      "outputs": [],
      "source": [
        "!pip install google-cloud-bigquery elasticsearch==8.16 google-auth"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "rAesontNXpLu"
      },
      "outputs": [],
      "source": [
        "from elasticsearch import Elasticsearch, exceptions\n",
        "from google.cloud import bigquery\n",
        "from google.colab import auth\n",
        "from getpass import getpass\n",
        "\n",
        "import json"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NwOmnk99Pfh3"
      },
      "source": [
        "## Declaring variables"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-4sV9fiXdBwj"
      },
      "source": [
        "This code will create inputs where you can enter your credentials.\n",
        "Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "GVKJKfFpPWuj",
        "outputId": "21c6f4e3-8cb5-4a2c-8efe-0e45c3c6b1c4"
      },
      "outputs": [],
      "source": [
        "ELASTICSEARCH_ENDPOINT = getpass(\"Elasticsearch endpoint: \")\n",
        "ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n",
        "\n",
        "\n",
        "# Google Cloud project name and BigQuery dataset name\n",
        "PROJECT_ID = \"elasticsearch-bigquery\"\n",
        "# dataset_id in format <your-project-name>.<your-dataset-name>\n",
        "DATASET_ID = f\"{PROJECT_ID}.server_logs\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3O2HclcYHEsS"
      },
      "source": [
        "## Instance a Elasticsearch client"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "1LWiop8NYiQF"
      },
      "outputs": [],
      "source": [
        "auth.authenticate_user()\n",
        "\n",
        "# Elasticsearch client\n",
        "es_client = Elasticsearch(\n",
        "    ELASTICSEARCH_ENDPOINT,\n",
        "    api_key=ELASTIC_API_KEY,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9lvPHaXjPlfu"
      },
      "source": [
        "## Creating mappings"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "tc88YzAYw31e"
      },
      "outputs": [],
      "source": [
        "try:\n",
        "    es_client.indices.create(\n",
        "        index=\"bigquery-logs\",\n",
        "        body={\n",
        "            \"mappings\": {\n",
        "                \"properties\": {\n",
        "                    \"status_code_description\": {\"type\": \"match_only_text\"},\n",
        "                    \"status_code\": {\"type\": \"keyword\"},\n",
        "                    \"@timestamp\": {\"type\": \"date\"},\n",
        "                    \"ip_address\": {\"type\": \"ip\"},\n",
        "                    \"http_method\": {\"type\": \"keyword\"},\n",
        "                    \"endpoint\": {\"type\": \"keyword\"},\n",
        "                    \"response_time\": {\"type\": \"integer\"},\n",
        "                }\n",
        "            }\n",
        "        },\n",
        "    )\n",
        "except exceptions.RequestError as e:\n",
        "    if e.error == \"resource_already_exists_exception\":\n",
        "        print(\"Index already exists.\")\n",
        "    else:\n",
        "        raise e"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "AOtwAfPXP38Z"
      },
      "source": [
        "## Getting data from BigQuery"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "DyKtsjXRXB2S",
        "outputId": "7734ad7e-cb11-42cc-b97d-e9241cab6b17"
      },
      "outputs": [],
      "source": [
        "client = bigquery.Client(project=PROJECT_ID)\n",
        "# Getting tables from dataset\n",
        "tables = client.list_tables(DATASET_ID)\n",
        "\n",
        "data = {}\n",
        "\n",
        "for table in tables:\n",
        "    # Table id must be in format <dataset_name>.<table_name>\n",
        "    table_id = f\"{DATASET_ID}.{table.table_id}\"\n",
        "\n",
        "    print(f\"Processing table: {table.table_id}\")\n",
        "\n",
        "    # Query to retrieve BigQuery tables data\n",
        "    query = f\"\"\"\n",
        "        SELECT *\n",
        "        FROM `{table_id}`\n",
        "    \"\"\"\n",
        "\n",
        "    query_job = client.query(query)\n",
        "\n",
        "    results = query_job.result()\n",
        "\n",
        "    print(f\"Results for table: {table.table_id}:\")\n",
        "\n",
        "    data[table.table_id] = []\n",
        "\n",
        "    for row in results:\n",
        "        # Saving data with key=table_id\n",
        "        data[table.table_id].append(dict(row))\n",
        "        print(row)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "UAznwqXStJ39",
        "outputId": "508a3255-43f7-4828-87d5-997cd04ca427"
      },
      "outputs": [],
      "source": [
        "# variable with data\n",
        "logs_data = data[\"logs\"]\n",
        "\n",
        "\n",
        "print(logs_data)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2s4Tr6wBP773"
      },
      "source": [
        "## Indexing to Elasticsearch"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "C4goyH6ZbDJK",
        "outputId": "cfb4af41-1ef1-40da-a2da-6c0aa83633d3"
      },
      "outputs": [],
      "source": [
        "bulk_data = []\n",
        "\n",
        "for log_entry in logs_data:\n",
        "    # Convert timestamp to ISO 8601 string\n",
        "    timestamp_iso8601 = log_entry[\"_timestamp\"].isoformat()\n",
        "\n",
        "    # Prepare action metadata\n",
        "    action_metadata = {\n",
        "        \"index\": {\n",
        "            \"_index\": \"bigquery-logs\",\n",
        "            \"_id\": f\"{log_entry['ip_address']}-{timestamp_iso8601}\",\n",
        "        }\n",
        "    }\n",
        "\n",
        "    # Prepare document\n",
        "    document = {\n",
        "        \"ip_address\": log_entry[\"ip_address\"],\n",
        "        \"status_code\": log_entry[\"status_code\"],\n",
        "        \"@timestamp\": timestamp_iso8601,\n",
        "        \"http_method\": log_entry[\"http_method\"],\n",
        "        \"endpoint\": log_entry[\"endpoint\"],\n",
        "        \"response_time\": log_entry[\"response_time\"],\n",
        "        \"status_code_description\": log_entry[\"status_code_description\"],\n",
        "    }\n",
        "\n",
        "    # Append to bulk data\n",
        "    bulk_data.append(action_metadata)\n",
        "    bulk_data.append(document)\n",
        "\n",
        "print(bulk_data)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "WPEwsJrFbDHQ",
        "outputId": "ab5904f7-21c1-4596-fb06-55569cd9eb17"
      },
      "outputs": [],
      "source": [
        "try:\n",
        "    # Indexing data\n",
        "    response = es_client.bulk(body=bulk_data)\n",
        "\n",
        "    if response[\"errors\"]:\n",
        "        print(\"Errors while indexing:\")\n",
        "        for item in response[\"items\"]:\n",
        "            if \"error\" in item[\"index\"]:\n",
        "                print(item[\"index\"][\"error\"])\n",
        "    else:\n",
        "        print(\"Documents indexed successfully.\")\n",
        "except Exception as e:\n",
        "    print(f\"Bulk indexing failed: {e}\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uxix_o8LQDup"
      },
      "source": [
        "# Searching data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "285_MwI8yknk",
        "outputId": "67d43aca-b05d-43f7-c730-1fa3bc0c4662"
      },
      "outputs": [],
      "source": [
        "response = es_client.search(\n",
        "    index=\"bigquery-logs\",\n",
        "    body={\n",
        "        \"query\": {\"match\": {\"status_code_description\": \"error\"}},\n",
        "        \"sort\": [{\"@timestamp\": {\"order\": \"desc\"}}],\n",
        "        \"aggs\": {\"by_ip\": {\"terms\": {\"field\": \"ip_address\", \"size\": 10}}},\n",
        "    },\n",
        ")\n",
        "\n",
        "# Print results\n",
        "formatted_json = json.dumps(response.body, indent=4)\n",
        "print(formatted_json)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "S6WZMJayyzxh"
      },
      "source": [
        "## Deleting\n",
        "\n",
        "Finally, we can delete the resources used to prevent them from consuming resources."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "9UcQwa41yy_x",
        "outputId": "02a33d89-b57c-4273-ba93-5b6441a4f91e"
      },
      "outputs": [],
      "source": [
        "# Cleanup - Delete Index\n",
        "es_client.indices.delete(index=\"bigquery-logs\", ignore=[400, 404])"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
